# Consumer 源码分析
作者:Ethan.Yang
博客:https://blog.ethanyang.cn (opens new window)
# 1 消费者启动流程
下面结合一个最简单的 Consumer Demo 来分析 RocketMQ 消费者启动全流程:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
// 重点:这是启动入口
consumer.start();
2
3
4
5
6

# 1.1 DefaultMQPushConsumer.start()
客户端入口方法:
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// ⭐ 核心:调用内部实现类 DefaultMQPushConsumerImpl.start()
this.defaultMQPushConsumerImpl.start();
// (可选)启动消息轨迹
if (traceDispatcher != null) { traceDispatcher.start(...); }
}
2
3
4
5
6
7
8
9
10
真正的消费者启动逻辑是在:
DefaultMQPushConsumerImpl.start()
# 1.2 DefaultMQPushConsumerImpl.start()
该方法内容较多,我们按步骤进行拆解。
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1. 校验配置
this.checkConfig();
// 2. 处理订阅关系(并自动生成重试 Topic)
this.copySubscription();
// 3. 获取或创建 MQClientInstance(JVM 共用)
this.mQClientFactory =
MQClientManager.getInstance().getOrCreateMQClientInstance(
this.defaultMQPushConsumer, this.rpcHook);
// ------------------------ 消费者核心组件初始化 ------------------------
// 4. 初始化负载均衡组件(RebalanceImpl)
this.rebalanceImpl.setConsumerGroup(consumerGroup);
this.rebalanceImpl.setMessageModel(messageModel);
this.rebalanceImpl.setAllocateMessageQueueStrategy(strategy);
this.rebalanceImpl.setmQClientFactory(mQClientFactory);
// 5. 创建 PullAPIWrapper(所有模式底层都是“拉”消息)
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory, consumerGroup, isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 6. 初始化消费进度 OffsetStore
this.initOffsetStore();
// 7. 加载本地消费进度
this.offsetStore.load();
// 8. 构建消费线程池(顺序 / 并发)
this.initConsumeService();
// 9. 启动消费线程池
this.consumeMessageService.start();
// 10. 注册消费者到 MQClientInstance
boolean registerOK =
mQClientFactory.registerConsumer(consumerGroup, this);
if (!registerOK) { throw new MQClientException("group duplicate", null); }
// 11. 启动 MQClientInstance(网络 + 定时任务 + rebalance)
mQClientFactory.start();
this.serviceState = ServiceState.RUNNING;
break;
}
// -------------------- 启动后的初始化逻辑(非常重要) --------------------
// 12. 更新订阅 Topic 的路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 13. 检测 Broker 与 Client 状态
this.mQClientFactory.checkClientInBroker();
// 14. 向所有 Broker 发送心跳包
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 15. 触发一次立即重平衡
this.mQClientFactory.rebalanceImmediately();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 1.3 启动流程总结
1 校验配置
2 加工订阅关系
3 创建获取 MQClientInstance(JVM 共享)
4 初始化负载均衡组件
5 创建 PullAPIWrapper(底层拉模式)
6 创建 OffsetStore(本地 / 远程)
7 加载消费进度 offset
8 根据监听器创建消费线程(顺序/并发)
9 启动消费线程池
10 注册 consumer 到 MQClientInstance
11 启动 MQClientInstance(网络、定时任务、拉取线程)
12 更新订阅路由信息
13 检测客户端在 Broker 的状态
14 发送心跳到所有 Broker
15 立即执行一次重平衡
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 2 消费者模式
RocketMQ 的消费者有两种消息模型:
- 集群消费(Clustering)
- 广播消费(Broadcasting)
两者本质上的区别: 👉 消息是否只会被 Consumer Group 内的一个实例处理 👉 消费进度(Offset)存储在哪儿
# 2.1 集群消费

概念
集群消费意味着:
同一个 Consumer Group 内的多个 Consumer 实例平均分摊消息,一条消息只会被该组内的其中一个实例消费。
适用于:
- 电商下单
- 创建订单
- 扣减库存
- 任何希望只执行一次的业务逻辑
工作原理
假设:
- Topic 有 3 条 MessageQueue(Q0 / Q1 / Q2)
- Consumer Group 内有 3 个实例(C1 / C2 / C3)
那么 Rebalance 时会分配:
| 实例 | 分配到的队列 |
|---|---|
| C1 | Q0 |
| C2 | Q1 |
| C3 | Q2 |
Producer 发送消息时轮询所有 Q,因此消息会平均分散到各个队列,而消费者实例也会“平均消费”。
消费进度(Consumer Offset)存储方式
集群模式下:
Offset 持久化到 Broker(RemoteBrokerOffsetStore)
优势:
- 多实例共享消费进度, 所以存储到Broker中
- 实例重启不会重复消费
- 消费者数量可水平扩容(自动分配队列)
# 2.2 广播消费

概念
广播消费意味着:
同一个 Consumer Group 内的每个实例都会消费所有消息。消息会广播给每个实例各一份。
适用于:
- 配置刷新
- 全量缓存更新
- 各实例独立处理同一份数据的场景
工作原理
广播模式下:
- 每个 Consumer 实例都会订阅 Topic 下所有 MessageQueue
- 因此每条消息会被 Group 内“每个实例”接收一次
换句话说:
| 实例 | 能消费哪些 Q? |
|---|---|
| C1 | Q0 / Q1 / Q2(全部) |
| C2 | Q0 / Q1 / Q2(全部) |
| C3 | Q0 / Q1 / Q2(全部) |
消费进度(Consumer Offset)存储方式
广播模式下:
Offset 持久化到本地文件(LocalFileOffsetStore)
每个实例独立维护 offset,不相互影响。
# 3 Consumer负载均衡
# 3.1 集群模式
在集群消费模式下:
同一个 Consumer Group 内的多个 Consumer 实例需要“分摊消费”一个 Topic 的所有 MessageQueue(MQ)。 一条消息只会被 Group 内其中一个实例消费。
因此,Rebalance 的目标是:
- 让每个 MessageQueue 只被一个实例消费
- 让每个实例尽可能平均地分配到相同数量的 MQ
RocketMQ 采用“主动拉取”模式消费消息,因此每个 Consumer 在拉取时必须明确:
我属于消费组 X,我负责拉取 Topic T 的哪个 MessageQueue?
为了决定这一点,就需要 Rebalance。
# 3.1.1 何时触发 Rebalance?
以下情况都会触发:
- Consumer 实例数量变化(上线 / 下线)
- Topic 的队列数量变化
- Broker 状态变化
- 定时 Rebalance(默认每 20 秒)
Rebalance 的核心逻辑在 RebalanceService.run() 中持续执行。
# 3.1.2 AllocateMessageQueueAveragely
算法说明:
** 平均分配算法, 将 MQ 列表平均切分给各个 Consumer 实例。**
例如:
| Topic 有 MQ | Consumer 实例 | 分配结果 |
|---|---|---|
| 8 条 | 3 个 | C1: 0,1,2 ;C2:3,4,5;C3:6,7 |
特点:
- 完全平均
- 顺序连续
- 最常用的分配算法(默认)
# 3.1.3 AllocateMessageQueueAveragelyByCircle
同样是平均分配,但采用“轮询取队列”的方式:
示例(MQ=8,实例=3):
| 实例 | 分配到的 MQ |
|---|---|
| C1 | 0,3,6 |
| C2 | 1,4,7 |
| C3 | 2,5 |
适用于“避免相邻队列集中在同一实例”的场景。
# 3.1.4 为什么一个 Queue 必须只分给一个实例?
因为 RocketMQ 是 Consumer 主动拉取消息(Pull 模式):
- 如果 Q0 同时分给 C1 和 C2
- 那两者都会从 Q0 拉取消息
- 同一条消息会被两个实例同时消费 → 完全违背集群消费语义
所以设计上保证:
MessageQueue : Consumer 是 N : 1
即一个MessageQueue只能被一个Consumer负责, 但一个 Consumer 实例可以负责多个不同的 Queue。
# 3.1.5 Queue 数量与 Consumer 数量的关系
如果 Consumer 数量 > Queue 数量,多出来的 Consumer 实例将无法分到队列,也就无法消费任何消息。
Example:
| MQ 数量 | Consumer 实例数 | 有效消费实例 | 空闲实例 |
|---|---|---|---|
| 4 | 2 | 全部可消费 | 0 |
| 4 | 4 | 全部可消费 | 0 |
| 4 | 6 | 4 个可消费 | 2 个空闲(没队列可拉) |
因此:
推荐:Queue 数量 ≥ Consumer 实例数
Topic 在创建时应规划合适的 Queue 个数(常见为 4 / 8 / 16 / 32)。
# 3.2 广播模式
广播消费模式下:
Consumer Group 中的每个实例必须消费所有队列的全部消息。 不存在“分摊”概念。
广播模式的特点:
- 每个实例分配 全部 MessageQueue
- 每条消息被 Group 内所有实例消费一次
- Offset 保存在本地,不共享
# 4 并发消费流程
一般我们在消费时使用回调函数的方式,使用得最多的是并发消费,消费者客户端代码如下:
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
Thread.sleep(1000);
System.out.println("收到消息:" + " topic :" + topic
+ " ,tags : " + tags
+ " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
// 告诉 Broker 稍后重投
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
回调只是最上层入口,真正的消费流程是由 RocketMQ 客户端在内部帮我们完成的,大致可以拆成:
- 获取 Topic 的路由信息(路由发现)
- 获取当前 Group 的 Consumer 列表(做队列负载均衡)
- 为当前 Consumer 分配 MessageQueue,并计算每个 Queue 的消费起点 Offset
- 基于 Pull 模式拉取消息(Push 是“伪推”,底层仍然是拉)
- 投递给消费线程池并回调
MessageListenerConcurrently - 定期持久化消费进度 Offset
- Consumer 关闭 / 注销时的清理
下面分步骤结合源码看一下。
# 4.1 获取 Topic 配置信息(路由信息)
在消费者启动之后,第一步都是从 NameServer 中获取 Topic 相关的路由信息(有哪些 Broker、写队列数、读队列数等)。
这一步是在 MQClientInstance.start() 里完成初始化 + 定时任务的:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 如果未显式配置 namesrvAddr,则先拉取 NameServer 地址列表
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 启动 Remoting 通道(与 NameServer / Broker 建立 Netty 连接)
this.mQClientAPIImpl.start();
// 启动各种定时任务(路由刷新、offset 持久化等)
this.startScheduledTask();
// 启动拉消息服务
this.pullMessageService.start();
// 启动负载均衡服务
this.rebalanceService.start();
...
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
startScheduledTask() 里有两个关键定时任务与路由相关:
private void startScheduledTask() {
// 1)定时拉取 NameServer 地址列表(2 分钟一次)
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 2)30s 一次刷新 Topic 路由信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
updateTopicRouteInfoFromNameServer() 的逻辑是:
把当前所有 Producer / Consumer 关注的 Topic 收集起来,逐个向 NameServer 拉取路由信息:
public void updateTopicRouteInfoFromNameServer() {
Set<String> topicList = new HashSet<>();
// Consumer 订阅的 Topic
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
MQConsumerInner impl = it.next().getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}
// Producer 发布的 Topic
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
MQProducerInner impl = it.next().getValue();
if (impl != null) {
topicList.addAll(impl.getPublishTopicList());
}
}
}
for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
真正向 NameServer 发送请求是在 MQClientAPIImpl.getTopicRouteInfoFromNameServer 里:
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic,
final long timeoutMillis,
boolean allowTopicNotExist)
throws MQClientException, InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
// 生产者/消费者向 NameServer 获取路由信息
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
RemotingCommand response =
this.remotingClient.invokeSync(null, request, timeoutMillis);
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
注意:路由刷新是定时任务,并非“实时感知” Broker 状态,所以 Broker 宕机、缩容等情况,客户端在一个短时间窗口内会有感知延迟,需要在业务上做好重试 / 容错。
# 4.2 获取 Group 的 ConsumerList(为负载均衡做准备)
消费者启动后,MQClientInstance 会启动一个专门的负载均衡服务:RebalanceService,间隔一段时间做一次“分配队列”的工作。
MQClientInstance.start()里已经看到:
// 12.2 开启拉消息服务(消费者:线程)
this.pullMessageService.start();
// 12.3 开启负载均衡服务(消费者:线程)
this.rebalanceService.start();
2
3
4
RebalanceService 的核心逻辑很简单:每隔 20s 调用一次 mqClientFactory.doRebalance():
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final MQClientInstance mqClientFactory;
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
MQClientInstance.doRebalance() 会遍历当前进程内所有 Consumer:
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
对于 DefaultMQPushConsumerImpl,会委托给 RebalanceImpl:
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
2
3
4
5
6
在 RebalanceImpl.doRebalance() 中,会遍历当前 Consumer 订阅的所有 Topic,并按 Topic 做 Rebalance:
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
集群 vs 广播:这里开始分叉
广播模式(BROADCASTING): 不需要从 Broker 获取 Consumer 列表,也没有队列级别负载均衡 —— 每个 Consumer 都消费 Topic 下所有队列。
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup, topic, mqSet, mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
集群模式(CLUSTERING):
需要先从 Broker 获取当前 Group 的所有 ConsumerId 列表,然后通过分配策略 AllocateMessageQueueStrategy 做队列分配。
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 从 Broker 拉取该 Group 下所有 ConsumerId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
...
}
2
3
4
5
6
findConsumerIdList 会先通过 Topic 找到某个 Broker 地址,再请求该 Broker 获取 Consumer 列表:
public List<String> findConsumerIdList(final String topic, final String group) {
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
try {
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
}
}
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
生成请求的代码:
public List<String> getConsumerIdListByGroup(final String addr,
final String consumerGroup,
final long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
// 获取 Group 对应的 ConsumerId 列表
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
...
}
2
3
4
5
6
7
8
9
10
11
12
到这里,我们已经:
- 拿到了 Topic 的路由信息(有哪些 Queue)
- 拿到了某个 Group 下所有 ConsumerId(集群模式)
- 通过 Rebalance 做好了“某个 Consumer 分到哪些 Queue”的决定
接下来就进入 “为分配到的每个 Queue 计算消费起点 Offset、并开始拉消息” 的流程。
# 4.3 获取 Queue 的消费 Offset
队列分配完成后,需要为新分配到的 MessageQueue 计算消费起始 Offset。这一步顺带把“拉消息请求”放入拉取队列中。
核心逻辑在 RebalanceImpl.updateProcessQueueTableInRebalance 中,这里简化看关键步骤:
- 找到当前应该分配给本 Consumer 的队列集合
mqSet(Rebalance 得来的) - 和本地
processQueueTable做对比:- 已经不属于本 Consumer 的队列 → 标记为 dropped,停止拉取
- 新分配到的队列 → 计算 Offset,并构造
PullRequest,交给PullMessageService
伪代码结构大致如下(不同版本源码可能略有出入):
boolean updateProcessQueueTableInRebalance(final String topic,
final Set<MessageQueue> mqSet,
final boolean isOrder) {
// this.processQueueTable: <MessageQueue, ProcessQueue>
// 新老对比,找出需要移除和新增的 Queue
// 1. 移除不再属于当前 Consumer 的队列
...
// 2. 为新分配到的队列创建 ProcessQueue,并计算拉取起点 Offset
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pq = new ProcessQueue();
this.processQueueTable.put(mq, pq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(this.consumerGroup);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequest.setNextOffset(nextOffset);
// 提交到 PullMessageService
this.dispatchPullRequest(pullRequest);
} else {
log.warn("doRebalance, {}, computePullFromWhere failed, mq={}",
consumerGroup, mq);
}
}
}
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
computePullFromWhere(mq) 就是真正读取 Offset 的地方:
- 集群模式:从 Broker 存储的 offset 中读取(
RemoteBrokerOffsetStore) - 广播模式:从本地文件中读取(
LocalFileOffsetStore)
并根据 consumeFromWhere 策略(CONSUME_FROM_LAST_OFFSET / FIRST_OFFSET / TIMESTAMP)决定:
public long computePullFromWhere(final MessageQueue mq) {
long result = -1;
switch (this.consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET:
// 优先使用已经存在的 offset;否则从队列尾部开始
...
case CONSUME_FROM_FIRST_OFFSET:
// 没有历史 offset 时,从头开始
...
case CONSUME_FROM_TIMESTAMP:
// 根据时间戳在队列中查找 offset
...
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
到这里为止:
每个分配给当前 Consumer 的 MessageQueue 都有了一个起始 Offset,并且生成了对应的 PullRequest,等待拉消息服务去执行。
# 4.4 拉取 Queue 的消息
PullMessageService 是一个专门的线程,不断从内部阻塞队列中取出 PullRequest,然后调用 DefaultMQPushConsumerImpl.pullMessage() 去拉消息。
# 4.4.1 PullMessageService 主循环
public class PullMessageService extends ServiceThread {
private final LinkedBlockingQueue<PullRequest> pullRequestQueue =
new LinkedBlockingQueue<>();
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 阻塞获取 PullRequest
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("PullMessageService run error", e);
}
}
log.info(this.getServiceName() + " service end");
}
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
// 延迟投递的逻辑,略
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.pullRequestQueue.put(pullRequest);
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mqClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest: {}", pullRequest);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 4.4.2 DefaultMQPushConsumerImpl.pullMessage
真正发起“拉消息请求”的地方:
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
final MessageQueue messageQueue = pullRequest.getMessageQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
// 流控、暂停等校验略...
final long offset = pullRequest.getNextOffset();
// 构建拉取请求,调用 PullAPIWrapper
this.pullAPIWrapper.pullKernelImpl(
messageQueue,
null, // subscriptionData 的表达式
this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),
offset,
this.defaultMQPushConsumer.getPullBatchSize(),
false,
timeoutMillis,
COMMUNICATION_MODE_ASYNC,
new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
// 拉取结果回调
pullMessageCallback(pullRequest, pullResult);
}
@Override
public void onException(Throwable e) {
// 异常时,稍后再拉
executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
在回调 pullMessageCallback 中会:
- 根据 PullResult 状态处理(FOUND / NO_NEW_MSG / NO_MATCHED_MSG 等)
- 把拉取到的消息存入
ProcessQueue - 更新
PullRequest的nextOffset - 提交消费任务到消费线程池(并发消费就是
ConsumeMessageConcurrentlyService)
示意(伪代码):
private void pullMessageCallback(final PullRequest pullRequest,
final PullResult pullResult) {
switch (pullResult.getPullStatus()) {
case FOUND:
// 把消息放入 ProcessQueue
int msgCount = processQueue.putMessage(pullResult.getMsgFoundList());
// 提交消费任务
consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
true);
// 更新下次拉取的 offset
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// 继续拉下一批
executePullRequestImmediately(pullRequest);
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG);
break;
...
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 4.4.3 并发消费:ConsumeMessageConcurrentlyService
并发消费对应的是 ConsumeMessageConcurrentlyService,内部用线程池来并行执行消费回调:
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private final ThreadPoolExecutor consumeExecutor;
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
// 略去拆分批次逻辑
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
@Override
public void run() {
// 回调用户注册的 MessageListenerConcurrently
ConsumeConcurrentlyStatus status =
messageListener.consumeMessage(msgs, context);
// 根据返回结果处理:成功 / 重投
processConsumeResult(status, context, this);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
这就是 MessageListenerConcurrently 被调用的地方。
# 4.5 更新 Queue 的消费 Offset
偏移量的更新有两个层面:
- 内存中的 offset:每次消费结果处理时更新
- 持久化 offset:定期把内存中的 offset 刷新到 Broker(集群模式)或本地文件(广播模式)
# 4.5.1 消费结果后的 offset 更新
仍然在 ConsumeMessageConcurrentlyService.processConsumeResult 中处理:
private void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest) {
switch (status) {
case CONSUME_SUCCESS:
// 成功消费,计算本次消费的最大 offset
long offset = consumeRequest.getOffset();
// 更新本地 offsetStore 中该 Queue 的内存 offset
this.defaultMQPushConsumerImpl.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
break;
case RECONSUME_LATER:
// 失败则发送重投请求(重试队列),offset 通常不前移或由重试逻辑单独处理
...
break;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
这里的
updateOffset一般只更新到内存结构(例如 ConcurrentMap),不一定立刻刷到远端。
# 4.5.2 定时持久化 offset
真正的持久化是在 MQClientInstance.startScheduledTask() 中的另一个定时任务完成的:
private void startScheduledTask() {
...
// 持久化消费进度:默认每 5s 一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
persistAllConsumerOffset() 会遍历所有 Consumer,调用各自的 offsetStore.persistAll:
- 集群模式:
RemoteBrokerOffsetStore,offset 持久化到 Broker - 广播模式:
LocalFileOffsetStore,offset 持久化到本地文件
# 4.6 注销Consumer
当应用关闭或者你主动调用 consumer.shutdown() 时,需要:
- 停止拉消息服务、负载均衡服务
- 持久化当前 offset
- 从
MQClientInstance的consumerTable中移除 - 向 Broker 发送 UNREGISTER 消费者的请求
在 DefaultMQPushConsumerImpl.shutdown() 中:
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
}
public void shutdown() {
synchronized (this) {
switch (this.serviceState) {
case RUNNING:
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
// 1. 持久化当前消费者的 offset
this.persistConsumerOffset();
// 2. 从 MQClientInstance 中注销当前 Consumer
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
// 3. 关闭内部各种服务(pullMessageService、rebalanceService 等)
this.mQClientFactory.shutdown();
...
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
unregisterConsumer 内部会发送 UNREGISTER_CLIENT 给 Broker,Broker 侧会从内存结构中移除对应的 Consumer 信息(包括心跳信息、负载均衡信息等)。
# 5 顺序消费流程
顺序消费示例代码:
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
// 开启自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到:同一个 queue 只会被一个消费线程处理
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ", queueId=" + msg.getQueueId()
+ ", content=" + new String(msg.getBody()));
}
try {
// 模拟业务处理耗时
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
整体骨架和并发消费是一样的:
还是:Rebalance 分配队列 → Pull 拉消息 → 消费线程池回调监听器。
唯一核心差异:*顺序消费多了一套“队列加锁 + 单线程串行消费”的机制,确保*同一个队列同一时刻只被一个消费者线程消费,从而保证队列内消息顺序**。
下面就只强调顺序消费比并发消费多出来的那一块:“锁队列”的实现流程。
# 5.1 顺序消费服务启动:周期性锁队列
顺序消费对应的实现类是 ConsumeMessageOrderlyService。
在 start() 时,如果是集群模式,会启动一个定时任务,定期去 Broker 续锁:
public void start() {
if (MessageModel.CLUSTERING.equals(
ConsumeMessageOrderlyService.this
.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
2
3
4
5
6
7
8
9
10
11
12
默认:每 20 秒执行一次
lockMQPeriodically,锁有效期约 60 秒,即:定期续约队列锁。
lockMQPeriodically() 很简单,直接调用 RebalanceImpl:
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}
2
3
4
5
# 5.2 客户端按 Broker 批量锁队列
RebalanceImpl.lockAll() 会先按 Broker 维度,把当前进程持有的 MessageQueue 分组,然后对每个 Broker 批量发送锁请求:
public void lockAll() {
HashMap<String, Set<MessageQueue>> brokerMqs =
this.buildProcessQueueTableByBrokerName();
for (Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty()) continue;
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(
brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);
try {
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl()
.lockBatchMQ(findBrokerResult.getBrokerAddr(),
requestBody, 1000);
// 根据返回结果,把成功锁住的队列标记为 locked
...
} catch (Exception e) {
...
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
发送锁请求的真正实现:
public Set<MessageQueue> lockBatchMQ(final String addr,
final LockBatchRequestBody requestBody,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
RemotingCommand response = this.remotingClient.invokeSync(
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
LockBatchResponseBody responseBody =
LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
// Broker 返回成功加锁的 MessageQueue 集合
return responseBody.getLockOKMQSet();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
关键点:
- 锁是 Broker 端维护的(哪个 Group 的哪个 clientId 拿到了某个队列的锁);
- 客户端只是在本地 ProcessQueue 上标记“是否已锁定”,没有抢到锁的队列不会真正开始消费。
# 5.3 消费端如何利用“队列锁”保证顺序?
顺序消费下,整体流程还是:
PullMessageService 拉消息 → 放入 ProcessQueue → 丢给 ConsumeMessageOrderlyService 的线程池。
区别在于:
- 只有拿到锁的队列才会被拉取 + 提交消费任务;
- 同一个队列只会由一个消费线程串行执行;
- 如果锁失效 / rebalance 导致队列被分配给别的实例,当前实例会把
ProcessQueue标记为 dropped,此后不再拉取或消费。
所以顺序保证是按队列维度的:
- 同一个 MessageQueue:加锁 + 单线程串行消费 → 有序
- 不同 MessageQueue:互相独立,可以并行消费 → 全局无序,但“分区内有序”
这也就是常说的:
RocketMQ 只能保证“分区(队列)内顺序”,而不能保证“Topic 全局顺序”。
# 5.4 关闭顺序消费:释放队列锁
在顺序消费服务 ConsumeMessageOrderlyService.shutdown() 中,会优雅关闭线程池,并在集群模式下释放所有队列锁:
public void shutdown(long awaitTerminateMillis) {
this.stopped = true;
this.scheduledExecutorService.shutdown();
ThreadUtils.shutdownGracefully(this.consumeExecutor,
awaitTerminateMillis, TimeUnit.MILLISECONDS);
if (MessageModel.CLUSTERING.equals(
this.defaultMQPushConsumerImpl.messageModel())) {
this.unlockAllMQ();
}
}
2
3
4
5
6
7
8
9
10
11
解锁逻辑同样委托给 RebalanceImpl:
public synchronized void unlockAllMQ() {
this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
}
2
3
unlockAll 和加锁类似,按 Broker 分组,然后批量发送 UNLOCK 请求:
public void unlockAll(final boolean oneway) {
HashMap<String, Set<MessageQueue>> brokerMqs =
this.buildProcessQueueTableByBrokerName();
for (Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty()) continue;
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(
brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);
this.mQClientFactory.getMQClientAPIImpl()
.unlockBatchMQ(findBrokerResult.getBrokerAddr(),
requestBody, 1000, oneway);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
unlockBatchMQ 内部就是发一个 UNLOCK_BATCH_MQ 请求给 Broker,Broker 端删除对应的锁记录。
# 6 消费卡死
在 RocketMQ 顺序消费中,最容易让人产生“卡死”、”不动了“的现象,主要有两类原因:
- 队列锁(Queue Lock)被占用 / 未释放
- Broker 主从切换(Master 挂了导致锁丢失)
下面把逻辑讲完整。
# 6.1 队列锁导致的“消费卡死”
顺序消费和并发消费最大的区别是:
同一个 MessageQueue 必须严格串行消费。 为此,RocketMQ 会在 Broker 端给队列加一个“分布式锁”。
流程如下:
- 消费者 A 请求消费 Queue-0 → Broker 给它加锁;
- 消费者 A 一直持有该队列的锁,其他消费者无法消费 Queue-0;
- 锁的默认有效期是 60 秒,消费者客户端每 20 秒续约一次;
如果消费者 A 在消费中途挂掉了,会发生什么?
- Broker 会认为 A 还持有锁(因为锁还没过期)
- 在锁的 60 秒 TTL 内,其他消费者 B 无法获得该队列的锁
结果就是:
B 收到 Queue-0,但拉消息却一直失败 → 会表现为“消费卡死/不动了”。
直到锁在 Broker 上自然过期,另一个消费者才能重新获得锁。
简单总结
- 顺序消费 = Broker 端队列锁
- Consumer 异常退出 → 锁无法立即释放,只能等 TTL 过期
- 在此期间,队列无法被其他实例消费 → “卡死”
这是顺序消费的常见现象,并非 Bug,而是锁的特性。
# 6.2 Broker 主从切换导致的“卡死”
RocketMQ 顺序消息要求:
- 加锁一定在 Master 上进行
- 消费失败也一定走 Master 处理
原因:锁记录只在 Master 内存中维护,不会同步到 Slave。
如果 Master 挂了,会发生什么?
假设如下场景:
- Consumer A 已经从 Master 获得了 Queue-0 的锁;
- Master 突然宕机;
- Consumer B 想切到 Slave 继续消费 Queue-0;
但问题是:
Slave 上没有锁记录,也没有锁系统。 Slave 模式下,不提供加锁能力。
所以会出现:
- Consumer B 发起锁请求 → 无响应 / 失败
- Consumer B 拉不到消息 → 消费能力暂停
- 直到 Master 重新恢复(或 HA 切主成功)
因此顺序消息在主从切换时会发生:
短暂的不可消费状态(类似“挂起”)→ 表现为消费卡住不动。
# 6.3 总结与设计原因
| 场景 | 根本原因 | 表现 |
|---|---|---|
| Consumer 异常退出 | Broker 的队列锁 TTL(60s)未过期 | 其他消费者无法接管该队列,消费暂停 60s |
| Master 挂掉,Slave 接管 | 加锁逻辑只在 Master,中途无法加锁 | 顺序队列在主从切换窗口不可消费 |
核心原因: 顺序消息的“顺序性大于可用性”。
要保证一条队列上的消息严格顺序,就必须做到:
- 一个队列只能由一个消费线程消费
- 任意时刻不能有两个消费者同时处理它
- 锁要能防止“脏接管”(即前一个实例还没释放,别人不能抢)
也就是说:
顺序消费的强一致性必然牺牲部分可用性。
# 6.4 解决方案
消费卡死原因不在于RocketMQ架构设计, 而在于消费端或者集群节点不稳定
手动控制
lockInterval和lockExpireRocketMQ 客户端可调节定时续锁频率(20s)与锁 TTL(60s),但一般不推荐过度修改。
避免业务阻塞
尽量减少使用顺序消费的业务场景
# 7 启动之后较长时间才消费
启动 RocketMQ 消费者后,常会发现需要一段时间(几秒甚至几十秒)才能正式开始消费。主要原因有两类:
并发消费:启动流程步骤多、成本高
PushConsumer 启动后,需要完成大量初始化动作:
- 从 NameServer 拉取 Topic 路由
- 建立与 Broker 的网络连接
- 启动拉消息线程(PullMessageService)
- 启动负载均衡线程(RebalanceService)
- 根据 Group 做队列负载均衡
- 为每个队列计算 offset
- 构造并提交首个 PullRequest
- 消费线程池初始化
尤其是:
- 订阅的 Topic 多
- 队列数量多
- Consumer 实例数量多(需要协调 Rebalance)
时,这一整套过程会更慢,所以消费者会出现启动后延迟几秒开始消费的现象。
**顺序消费:队列锁导致的“冷启动等待” **
顺序消费额外依赖Broker 端的队列锁。
如果之前的 Consumer 异常退出,Broker 上的锁不会立刻释放,需要等待:
默认 60 秒的锁有效期自动过期
因此,新启动的 Consumer 想要消费这个队列时:
- 会先尝试加锁
- 如果上一实例的锁还没过期 → 加锁失败
- 只能等待下一次续锁时机(20s)+ 锁过期(最长 60s)
最终表现就是:
顺序消息消费者启动后要等几十秒才能开始消费,看起来像“卡住了”。